
This notebook explores the use of ODC with Dask LocalCluster. The goal is to introduce fundamental concepts and the role Dask can serve with datacube and subsequent computation using xarray.
The example computation is fairly typical of an EO data processing pipeline. We'll be using a small area and time period to start with and progressively scaling this example. EO scientists may find some aspects of these examples unrealistic, but this isn't an EO science course ☺.
The basic workflow is:
datacube querydatacube.load()NOTE: Some cells in this notebook will take minutes to run so please be patient. Also, some cells can load large datasets into Jupyter's memory (based on the defaults), which can exhaust the available memory and cause the kernel to crash. If this occurs: restart the kernal, run the Setup cells, and then jump to the next section. The (text) outputs from previous sections can be retained between kernel restarts. Example times for each exercise:
| Process | N times | Load |
|---|---|---|
| Load data without dask | 7 | 3-3.5 mins |
| Exploring dask concepts with ODC | 2 | 1 min |
| The impact of dask on ODC | 22 | 1-2 mins |
| Exploiting delayed tasks | 7 | 1 min |
| Data and computational locality | 7 | 30-50 secs |
| With compute on the algorithm | 7 | 10 secs |
| With selected measurements | 7 | 10 secs |
import git
import sys, os
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
from dask.distributed import Client, LocalCluster
import datacube
from datacube.utils import masking
from datacube.utils.aws import configure_s3_access
# EASI defaults
os.environ['USE_PYGEOS'] = '0'
repo = git.Repo('.', search_parent_directories=True).working_tree_dir
if repo not in sys.path: sys.path.append(repo)
from easi_tools import EasiDefaults, notebook_utils
easi = EasiDefaults()
client = None
Successfully found configuration for deployment "csiro"
The next cell sets out all the query parameters used in our datacube.load().
For this run we keep the ROI quite small.
# Get the default latitude & longitude extents
study_area_lat = easi.latitude
study_area_lon = easi.longitude
# Or choose your own by uncommenting and modifying this section
###############################################################
# # Central Tasmania (near Little Pine Lagoon)
# central_lat = -42.019
# central_lon = 146.615
# # Set the buffer to load around the central coordinates
# # This is a radial distance for the bbox to actual area so bbox 2x buffer in both dimensions
# buffer = 0.05
# # Compute the bounding box for the study area
# study_area_lat = (central_lat - buffer, central_lat + buffer)
# study_area_lon = (central_lon - buffer, central_lon + buffer)
###############################################################
# Data product
product = easi.product('landsat')
# product = 'landsat8_c2l2_sr'
# Set the date range to load data over
set_time = easi.time
# set_time = ("2021-01-01", "2021-01-31")
# Selected measurement names (used in this notebook). None` will load all of them
alias = easi.aliases('landsat')
measurements = None
# measurements = [alias[x] for x in ['qa_band', 'red', 'nir']]
# Set the QA band name and mask values
qa_band = alias['qa_band']
qa_mask = easi.qa_mask('landsat')
# Set the resampling method for the bands
resampling = {qa_band: "nearest", "*": "average"}
# Set the coordinate reference system and output resolution
set_crs = easi.crs('landsat') # If defined, else None
set_resolution = easi.resolution('landsat') # If defined, else None
# set_crs = "epsg:3577"
# set_resolution = (-30, 30)
# Set the scene group_by method
group_by = "solar_day"
Now initialise the datacube.
dc = datacube.Datacube()
# Access AWS "requester-pays" buckets
# This is necessary for reading data from most third-party AWS S3 buckets such as for Landsat and Sentinel-2
from datacube.utils.aws import configure_s3_access
configure_s3_access(aws_unsigned=False, requester_pays=True);
Now load the data. This first dc.load() does not use Dask so it will take a few minutes.
We use %%time to keep track of how long things take to complete.
%%time
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
group_by=group_by,
)
CPU times: user 1min 25s, sys: 12.5 s, total: 1min 38s Wall time: 3min 36s
The result of the datacube.load() function is an xarray.Dataset.
Jupyter notebooks can render a description of the xarray dataset variable with a lot of useful information about the structure of data.
dataset
<xarray.Dataset>
Dimensions: (time: 7, y: 2013, x: 1709)
Coordinates:
* time (time) datetime64[ns] 2020-02-08T23:56:55.714...
* y (y) float64 -3.988e+06 -3.988e+06 ... -4.049e+06
* x (x) float64 1.328e+06 1.328e+06 ... 1.379e+06
spatial_ref int32 3577
Data variables: (12/22)
nbart_coastal_aerosol (time, y, x) int16 -999 -999 -999 ... 972 973
nbart_blue (time, y, x) int16 -999 -999 -999 ... 978 980
nbart_green (time, y, x) int16 -999 -999 -999 ... 1053 1082
nbart_red (time, y, x) int16 -999 -999 -999 ... 1119 1155
nbart_nir (time, y, x) int16 -999 -999 -999 ... 2676 2631
nbart_swir_1 (time, y, x) int16 -999 -999 -999 ... 1912 2136
... ...
oa_relative_slope (time, y, x) float32 26.82 25.31 ... 95.97 114.5
oa_satellite_azimuth (time, y, x) float32 100.1 100.1 ... 100.2 100.2
oa_satellite_view (time, y, x) float32 9.022 9.02 ... 3.36 3.358
oa_solar_azimuth (time, y, x) float32 67.9 67.9 ... 47.08 47.08
oa_solar_zenith (time, y, x) float32 39.49 39.49 ... 50.96 50.96
oa_time_delta (time, y, x) float32 -0.4956 -0.4971 ... 5.429
Attributes:
crs: EPSG:3577
grid_mapping: spatial_refOpen the Data variables (click "‣ Data variables") and click on the stacked cylinders for one of them. You will see the actual data array is available and shown in summary form.
NOTE that you can see real numbers in the array when you do this. This will change when we start using Dask.
This graphical summary of an xarray variable will become increasingly importantly when dask is enabled and as scale out occurs so take a moment now to just poke around the interface. Depending on your area of interest set above, you should have a relatively small area: perhaps around 300 to 400 pixels in each of the x and y dimensions and perhaps up to 10 time slices. This is a relatively small size and fine to do without using Dask.
Next up filter out pixels that are affect by clouds and other issues and compute the NDVI. Since we aren't specifying a time range this will be performed for all images.
%%time
# Identify pixels that don't have cloud, cloud shadow or water
from datacube.utils import masking
cloud_free_mask = masking.make_mask(dataset[qa_band], **qa_mask)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free[alias['nir']] - cloud_free[alias['red']]
band_sum = cloud_free[alias['nir']] + cloud_free[alias['red']]
# Calculate NDVI
ndvi = None
ndvi = band_diff / band_sum
CPU times: user 841 ms, sys: 1.08 s, total: 1.92 s Wall time: 1.92 s
The result ndvi is an xarray.DataArray. Let's take a look at it. Again the notebook will render an html version of the data in summary form.
Notice again the actual data values are being shown and that there are the same number of time slices as above and the x and y dimensions are identical.
ndvi
<xarray.DataArray (time: 7, y: 2013, x: 1709)>
array([[[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan],
...,
[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan]],
[[0.21968003, 0.21452976, 0.20806599, ..., 0.34342706,
0.35976648, 0.37791829],
[0.19766035, 0.19309514, 0.18671527, ..., 0.3560434 ,
0.36472946, 0.36527237],
[0.19287958, 0.18996865, 0.18799172, ..., 0.35883703,
0.36058175, 0.3493558 ],
...
[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan],
[ nan, nan, nan, ..., nan,
nan, nan]],
[[ nan, nan, nan, ..., 0.62104238,
0.66167231, 0.69073634],
[ nan, nan, nan, ..., 0.64705882,
0.67499382, 0.6999766 ],
[ nan, nan, nan, ..., 0.65460186,
0.68363546, 0.7074894 ],
...,
[0.66087613, 0.63791103, 0.62416107, ..., nan,
nan, nan],
[0.66942771, 0.64366472, 0.64570064, ..., nan,
nan, nan],
[0.61594733, 0.56981982, 0.56342412, ..., nan,
nan, nan]]])
Coordinates:
* time (time) datetime64[ns] 2020-02-08T23:56:55.714845 ... 2020-03...
* y (y) float64 -3.988e+06 -3.988e+06 ... -4.049e+06 -4.049e+06
* x (x) float64 1.328e+06 1.328e+06 ... 1.379e+06 1.379e+06
spatial_ref int32 3577Raw numbers aren't nice to look at so let's draw a time slice. We'll select just one of them to draw and pick one that didn't get masked out by cloud completely. You can see that all clouds and water has been masked out so that we are just looking at the NDVI of the land area.
ndvi.isel(time=1).plot()
<matplotlib.collections.QuadMesh at 0x7fb3eb52dcc0>
Let's set our time range to a couple of weeks, or approximately two passes of Landsat 8 for this ROI. Less data will allow us to explore how dask works with the datacube and xarray libraries.
set_time = (set_time[0], parse(set_time[0]) + relativedelta(weeks=3))
# set_time = ("2021-01-01", "2021-01-14")
set_time
('2020-02-01', datetime.datetime(2020, 2, 22, 0, 0))
%%time
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
group_by=group_by,
)
dataset
CPU times: user 24.3 s, sys: 3.54 s, total: 27.8 s Wall time: 51.5 s
<xarray.Dataset>
Dimensions: (time: 2, y: 2013, x: 1709)
Coordinates:
* time (time) datetime64[ns] 2020-02-08T23:56:55.714...
* y (y) float64 -3.988e+06 -3.988e+06 ... -4.049e+06
* x (x) float64 1.328e+06 1.328e+06 ... 1.379e+06
spatial_ref int32 3577
Data variables: (12/22)
nbart_coastal_aerosol (time, y, x) int16 -999 -999 -999 ... 478 499
nbart_blue (time, y, x) int16 -999 -999 -999 ... 491 534
nbart_green (time, y, x) int16 -999 -999 -999 ... 635 682
nbart_red (time, y, x) int16 -999 -999 -999 ... 738 845
nbart_nir (time, y, x) int16 -999 -999 -999 ... 2238 2334
nbart_swir_1 (time, y, x) int16 -999 -999 -999 ... 1883 2178
... ...
oa_relative_slope (time, y, x) float32 26.82 25.31 ... 141.0 171.0
oa_satellite_azimuth (time, y, x) float32 100.1 100.1 ... 280.1 280.1
oa_satellite_view (time, y, x) float32 9.022 9.02 ... 7.737 7.74
oa_solar_azimuth (time, y, x) float32 67.9 67.9 ... 62.42 62.42
oa_solar_zenith (time, y, x) float32 39.49 39.49 ... 39.6 39.6
oa_time_delta (time, y, x) float32 -0.4956 -0.4971 ... 1.099
Attributes:
crs: EPSG:3577
grid_mapping: spatial_refAs before you can see the actual data in the results but this time there should only be 1 or 2 observation times
Now let's create a LocalCluster as we did in the earlier notebook.
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
client
Client-6b773375-0464-11ee-926b-06a3e810ea18
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
2558b75e
| Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
| Total threads: 8 | Total memory: 24.00 GiB |
| Status: running | Using processes: True |
Scheduler-9d67f45d-1c7d-4c94-8ea4-b0b4484205a4
| Comm: tcp://127.0.0.1:45489 | Workers: 4 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
| Started: Just now | Total memory: 24.00 GiB |
| Comm: tcp://127.0.0.1:33385 | Total threads: 2 |
| Dashboard: http://127.0.0.1:45065/status | Memory: 6.00 GiB |
| Nanny: tcp://127.0.0.1:43585 | |
| Local directory: /tmp/dask-worker-space/worker-81no5071 | |
| Comm: tcp://127.0.0.1:35215 | Total threads: 2 |
| Dashboard: http://127.0.0.1:42473/status | Memory: 6.00 GiB |
| Nanny: tcp://127.0.0.1:46689 | |
| Local directory: /tmp/dask-worker-space/worker-0z3hhmct | |
| Comm: tcp://127.0.0.1:39851 | Total threads: 2 |
| Dashboard: http://127.0.0.1:41123/status | Memory: 6.00 GiB |
| Nanny: tcp://127.0.0.1:46171 | |
| Local directory: /tmp/dask-worker-space/worker-1x8q2iow | |
| Comm: tcp://127.0.0.1:43485 | Total threads: 2 |
| Dashboard: http://127.0.0.1:39837/status | Memory: 6.00 GiB |
| Nanny: tcp://127.0.0.1:43977 | |
| Local directory: /tmp/dask-worker-space/worker-st0xxu0o | |
You may like to open up the dashboard for the cluster, although for this notebook we won't be talking about the dashboard (that's for a later discussion).
notebook_utils.localcluster_dashboard(client=client, server=easi.hub)
'https://hub.csiro.easi-eo.solutions/user/csiro-csiro-aad_pag064@csiro.au/proxy/8787/status'
Now that we are using a cluster, even though it is local, we need to make sure that our cluster has the right configuration to use Requester Pays buckets in AWS S3. To do this, we need to re-run the configure_s3_access() function that we ran earlier, but we need to pass the client to the function as well.
from datacube.utils.aws import configure_s3_access
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);
datacube.load() will use the default dask cluster (the one we just created) if the dask_chunks parameter is specified.
The chunk shape and memory size is a critial parameter in tuning dask and we will be discussing it in great detail as scale increases. For now we're simply going to specify that the time dimension should individually chunked (1 slice of time) and by not specifying any chunking for the other dimensions they will be form a single contiguous block.
If that made no sense what's so ever, that's fine because we will look at an example.
chunks = {"time":1}
%%time
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks, ###### THIS IS THE ONLY LINE CHANGED. #####
group_by=group_by,
)
dataset
CPU times: user 184 ms, sys: 10.4 ms, total: 194 ms Wall time: 230 ms
<xarray.Dataset>
Dimensions: (time: 2, y: 2013, x: 1709)
Coordinates:
* time (time) datetime64[ns] 2020-02-08T23:56:55.714...
* y (y) float64 -3.988e+06 -3.988e+06 ... -4.049e+06
* x (x) float64 1.328e+06 1.328e+06 ... 1.379e+06
spatial_ref int32 3577
Data variables: (12/22)
nbart_coastal_aerosol (time, y, x) int16 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
nbart_blue (time, y, x) int16 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
nbart_green (time, y, x) int16 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
nbart_red (time, y, x) int16 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
nbart_nir (time, y, x) int16 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
nbart_swir_1 (time, y, x) int16 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
... ...
oa_relative_slope (time, y, x) float32 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
oa_satellite_azimuth (time, y, x) float32 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
oa_satellite_view (time, y, x) float32 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
oa_solar_azimuth (time, y, x) float32 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
oa_solar_zenith (time, y, x) float32 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
oa_time_delta (time, y, x) float32 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
Attributes:
crs: EPSG:3577
grid_mapping: spatial_refFirst thing you probably noticed is that whilst only one line changed the load time dropped to sub-seconds!
The second thing you probably noticed is if you look at one of the data variables by clicking on the database icon as before, there is no data but instead there is a diagram which shows you the Dask Chunks for each measurement. It's really fast because it didn't actually load any data!
When datatcube has dask_chunks specified it switches from creating xarrays to instead use dask.arrays in the backend and lazy loads them - this means that no data is loaded until used. If you look at one of the data variables you will see it now has dask.array<chunksize=(....)> rather than values and the cylinder icon will show the Array and Chunk parameters along with some statistics, not actual data.
The datacube.load() has used the dask.Delayed interface which will not perform any tasks (Dask's name for calculations) until the result of the task is actually required. We'll load the data in a moment but first let's take a look at the parameters in that pretty visualisation. Click on the cylinder for the red Data variables and look at the table and the figure. It should look similar to the image below.
Looking at this image (yours may be different), you can see that:
221.92 kiB in total size and is broken into Chunks which have size 110.96 kiB(2, 375, 303) (time, y, x) but each chunk is (1,375,303) because we specified the time dimension should have chunks of length 1.2 chunk tasks, one for each time slice and in this instance, and one graph layer. More complex calculations will have more layers in the graph.uint16 and is split up into chunks which are numpy.ndarrays.The chunking has split the array loading into two Chunks. Dask can execute these in parallel.
We can look at the delayed tasks and how they will be executed by visualising the task graph for one of the variables. We'll use the red band measurement.
dataset[alias['red']].data.visualize()
Details on the task graph can be found in the dask user guide but what's clear is you have two independent paths of execution which produce one time slice each (0,0,0) and (1,0,0). These are the two chunks that that full array has been split into.
To retrieve the actual data we need to compute() the result, this will cause all the delayed tasks to be executed for the variable we are computing. Let's compute() the red variable.
%%time
actual_red = dataset[alias['red']].compute()
actual_red
CPU times: user 87.4 ms, sys: 29.3 ms, total: 117 ms Wall time: 2.02 s
<xarray.DataArray 'nbart_red' (time: 2, y: 2013, x: 1709)>
array([[[-999, -999, -999, ..., 2137, 2148, 2168],
[-999, -999, -999, ..., 2111, 2122, 2158],
[-999, -999, -999, ..., 2103, 2123, 2150],
...,
[4912, 4938, 4919, ..., 3382, 3447, 3421],
[4883, 4911, 4901, ..., 3446, 3527, 3483],
[4880, 4915, 4905, ..., 3538, 3596, 3514]],
[[2268, 2184, 2160, ..., 1366, 1316, 1279],
[1989, 2010, 2008, ..., 1276, 1268, 1305],
[1927, 1938, 1961, ..., 1257, 1275, 1313],
...,
[ 486, 328, 294, ..., 1339, 1273, 1198],
[ 386, 274, 252, ..., 1042, 1016, 959],
[ 332, 279, 273, ..., 665, 738, 845]]], dtype=int16)
Coordinates:
* time (time) datetime64[ns] 2020-02-08T23:56:55.714845 2020-02-16T...
* y (y) float64 -3.988e+06 -3.988e+06 ... -4.049e+06 -4.049e+06
* x (x) float64 1.328e+06 1.328e+06 ... 1.379e+06 1.379e+06
spatial_ref int32 3577
Attributes:
units: 1
nodata: -999
crs: EPSG:3577
grid_mapping: spatial_refAs you can see we now have actual data (there are real numbers, not just Dask arrays). You can do the same thing for all arrays in the dataset in one go by computing the dataset itself.
%%time
actual_dataset = dataset.compute()
actual_dataset
CPU times: user 596 ms, sys: 430 ms, total: 1.03 s Wall time: 9.5 s
<xarray.Dataset>
Dimensions: (time: 2, y: 2013, x: 1709)
Coordinates:
* time (time) datetime64[ns] 2020-02-08T23:56:55.714...
* y (y) float64 -3.988e+06 -3.988e+06 ... -4.049e+06
* x (x) float64 1.328e+06 1.328e+06 ... 1.379e+06
spatial_ref int32 3577
Data variables: (12/22)
nbart_coastal_aerosol (time, y, x) int16 -999 -999 -999 ... 478 499
nbart_blue (time, y, x) int16 -999 -999 -999 ... 491 534
nbart_green (time, y, x) int16 -999 -999 -999 ... 635 682
nbart_red (time, y, x) int16 -999 -999 -999 ... 738 845
nbart_nir (time, y, x) int16 -999 -999 -999 ... 2238 2334
nbart_swir_1 (time, y, x) int16 -999 -999 -999 ... 1883 2178
... ...
oa_relative_slope (time, y, x) float32 26.82 25.31 ... 141.0 171.0
oa_satellite_azimuth (time, y, x) float32 100.1 100.1 ... 280.1 280.1
oa_satellite_view (time, y, x) float32 9.022 9.02 ... 7.737 7.74
oa_solar_azimuth (time, y, x) float32 67.9 67.9 ... 62.42 62.42
oa_solar_zenith (time, y, x) float32 39.49 39.49 ... 39.6 39.6
oa_time_delta (time, y, x) float32 -0.4956 -0.4971 ... 1.099
Attributes:
crs: EPSG:3577
grid_mapping: spatial_refFrom the above we can see that specifying dask_chunks in datacube.load() splits up the load() operation into a set of chunk shaped arrays and delayed tasks. Dask can now perform those tasks in parallel. Dask will only compute the results for those parts of the data we are using but we can force the computation of all the delayed tasks using compute().
There is a lot more opportunity than described in this simple example but let's just focus on the impact of dask on ODC for this simple case.
The time period and ROI are far too small to be interesting so let's change our time range to a few months of data.
set_time = (set_time[0], parse(set_time[0]) + relativedelta(months=6))
# set_time = ("2021-01-01", "2021-06-30")
set_time
('2020-02-01', datetime.datetime(2020, 8, 1, 0, 0))
We skip loading this longer time range (larger data selection) without dask because it can take many minutes and may use more than the available memory in the Jupyter node.
Let's enable dask and then do the load. We're chunking by time (length one) so dask will be able to load each time slice in parallel. The data variables are also independent so will be done in parallel as well.
if client is None:
cluster = LocalCluster()
client = Client(cluster)
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);
display(notebook_utils.localcluster_dashboard(client=client, server=easi.hub))
else:
client.restart()
'https://hub.csiro.easi-eo.solutions/user/csiro-csiro-aad_pag064@csiro.au/proxy/8787/status'
%%time
chunks = {"time":1}
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks, ###### THIS IS THE ONLY LINE CHANGED. #####
group_by=group_by,
)
dataset
CPU times: user 287 ms, sys: 26.3 ms, total: 313 ms Wall time: 330 ms
<xarray.Dataset>
Dimensions: (time: 22, y: 2013, x: 1709)
Coordinates:
* time (time) datetime64[ns] 2020-02-08T23:56:55.714...
* y (y) float64 -3.988e+06 -3.988e+06 ... -4.049e+06
* x (x) float64 1.328e+06 1.328e+06 ... 1.379e+06
spatial_ref int32 3577
Data variables: (12/22)
nbart_coastal_aerosol (time, y, x) int16 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
nbart_blue (time, y, x) int16 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
nbart_green (time, y, x) int16 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
nbart_red (time, y, x) int16 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
nbart_nir (time, y, x) int16 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
nbart_swir_1 (time, y, x) int16 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
... ...
oa_relative_slope (time, y, x) float32 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
oa_satellite_azimuth (time, y, x) float32 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
oa_satellite_view (time, y, x) float32 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
oa_solar_azimuth (time, y, x) float32 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
oa_solar_zenith (time, y, x) float32 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
oa_time_delta (time, y, x) float32 dask.array<chunksize=(1, 2013, 1709), meta=np.ndarray>
Attributes:
crs: EPSG:3577
grid_mapping: spatial_refWoah!! that was fast - but we didn't actually compute anything so no load has occurred and all tasks are pending. Open up the Data Variables, click the stacked cylinders and take a look at the delayed task counts. These exist for every variable.
Let's visualise the task graph for the red band.
dataset[alias['red']].data.visualize()
Well that's not as useful, is it!
You should just be able to make out that each of the chunks are able to independently load(). time chunk is length 1 so these are individual times. This holds true for all the bands so dask can spread these out across multiple threads.
Tip: Visualising task graphs is less effective as your task graph complexity increases. You may need to use simpler examples to see what is going on.
Let's get the actual data
%%time
actual_dataset = dataset.compute()
actual_dataset
/env/lib/python3.10/site-packages/rasterio/warp.py:344: NotGeoreferencedWarning: Dataset has no geotransform, gcps, or rpcs. The identity matrix will be returned. _reproject(
CPU times: user 14.5 s, sys: 5.37 s, total: 19.9 s Wall time: 1min 38s
<xarray.Dataset>
Dimensions: (time: 22, y: 2013, x: 1709)
Coordinates:
* time (time) datetime64[ns] 2020-02-08T23:56:55.714...
* y (y) float64 -3.988e+06 -3.988e+06 ... -4.049e+06
* x (x) float64 1.328e+06 1.328e+06 ... 1.379e+06
spatial_ref int32 3577
Data variables: (12/22)
nbart_coastal_aerosol (time, y, x) int16 -999 -999 -999 ... -999 -999
nbart_blue (time, y, x) int16 -999 -999 -999 ... -999 -999
nbart_green (time, y, x) int16 -999 -999 -999 ... -999 -999
nbart_red (time, y, x) int16 -999 -999 -999 ... -999 -999
nbart_nir (time, y, x) int16 -999 -999 -999 ... -999 -999
nbart_swir_1 (time, y, x) int16 -999 -999 -999 ... -999 -999
... ...
oa_relative_slope (time, y, x) float32 26.82 25.31 ... -155.9
oa_satellite_azimuth (time, y, x) float32 100.1 100.1 ... 280.2 280.2
oa_satellite_view (time, y, x) float32 9.022 9.02 ... 7.95 7.952
oa_solar_azimuth (time, y, x) float32 67.9 67.9 ... 35.28 35.28
oa_solar_zenith (time, y, x) float32 39.49 39.49 ... 64.21 64.21
oa_time_delta (time, y, x) float32 -0.4956 -0.4971 ... 1.008
Attributes:
crs: EPSG:3577
grid_mapping: spatial_refHow fast this step is will depend on how many cores are in your Jupyter notebook's local cluster. In real world scenarios, an 8-core cluster the datacube.load() this may take between 1/4 or 1/6 of the time compared to without dask (not shown) depending on many factors. This is great!
Why not 1/8 of the time?
Dask has overheads, and datacube.load() itself is IO limited. There are all sorts of things that result in limits and part of the art of parallel computing is tuning your algorithm to reduce the impact of these and achieve greater performance. As we scale up this example we'll explore some of these.
Tip: recent updates to Dask have greatly improved performance and we are now seeing more substantial performance gains, more in line with the increase in cores.
Do not always expect 8x as many cores to produce 8x the speed up. Algorithms can be tuned to perform better (or worse) as scale increases. This is part of the art of parallel programming. Dask does it's best, and you can often do better.
Now let's repeat the full example, with NDVI calculation and masking, in a single cell with dask and compute to load the data in. We get the total time for later comparison.
Most of the time (not shown) is in the data load and the NDVI calculation is < 1 second.
To ensure comparable timings, we will .restart() the Dask cluster. This makes sure that we aren't just seeing performance gains for data caching.
Note that this may show some
Restarting workerwarnings. That is ok and it is just telling you that each of the four workers in the cluster are restarting.
if client is None:
cluster = LocalCluster()
client = Client(cluster)
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);
display(notebook_utils.localcluster_dashboard(client=client, server=easi.hub))
else:
client.restart()
'https://hub.csiro.easi-eo.solutions/user/csiro-csiro-aad_pag064@csiro.au/proxy/8787/status'
%%time
chunks = {"time":1}
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks,
group_by=group_by,
)
actual_dataset = dataset.compute() ### Compute the dataset ###
# Identify pixels that don't have cloud, cloud shadow or water
cloud_free_mask = masking.make_mask(actual_dataset[qa_band], **qa_mask)
# Apply the mask
cloud_free = actual_dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free[alias['nir']] - cloud_free[alias['red']]
band_sum = cloud_free[alias['nir']] + cloud_free[alias['red']]
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
CPU times: user 5.69 s, sys: 2.67 s, total: 8.36 s Wall time: 31.8 s
Quicker but can do better...
When compute() is called dask not only executes all the tasks but it consolidates all the distributed chunks back into a normal array on the client machine - in this case the notebook's kernel. In the previous cell we have two variables that both refer to the data we are loading:
delayed version of the data. The delayed tasks and the chunks that make it up will be on the clusterSo in the previous cell everything after the actual_dataset = dataset.compute() line is computed in the Jupyter kernel and doesn't use the dask cluster at all for computation.
If we shift the location of this compute() call we can perform more tasks in parallel on the dask cluster.
Tip: Locality is an important concept and applies to both data and computation
Now let's repeat the load and NDVI calculation but this time rather than compute() on the full dataset we'll run the compute at the cloud masking step (cloud_free = dataset.where(cloud__free_mask).compute()) so the masking operation can be performed in parallel. Let's see what the impact is...
if client is None:
cluster = LocalCluster()
client = Client(cluster)
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);
display(notebook_utils.localcluster_dashboard(client=client, server=easi.hub))
else:
client.restart()
'https://hub.csiro.easi-eo.solutions/user/csiro-csiro-aad_pag064@csiro.au/proxy/8787/status'
%%time
chunks = {"time":1}
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks,
group_by=group_by,
)
# Identify pixels that are either "valid", "water" or "snow"
cloud_free_mask = masking.make_mask(dataset[qa_band], **qa_mask)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask).compute() ### COMPUTE MOVED HERE ###
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free[alias['nir']] - cloud_free[alias['red']]
band_sum = cloud_free[alias['nir']] + cloud_free[alias['red']]
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
actual_ndvi = ndvi
CPU times: user 3.3 s, sys: 2.87 s, total: 6.16 s Wall time: 30.3 s
A few seconds quicker but not that different. This isn't too surprising since the masking operation is pretty quick (it's all numpy) and the data load is the bulk of the processing.
Dask can see the entire task graph for both load and mask computation. As a result some of the computation can be performed concurrently with file IO, and CPUs are busier as a result, so it will be slightly faster in practice but with IO dominating we won't see much overall improvement.
Perhaps doing more of the calculation on the cluster will help. Let's also move ndvi.compute() so the entire calculation is done on the cluster and only the final result returned to the client.
if client is None:
cluster = LocalCluster()
client = Client(cluster)
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);
display(notebook_utils.localcluster_dashboard(client=client, server=easi.hub))
else:
client.restart()
'https://hub.csiro.easi-eo.solutions/user/csiro-csiro-aad_pag064@csiro.au/proxy/8787/status'
%%time
chunks = {"time":1}
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks,
group_by=group_by,
)
# Identify pixels that don't have cloud, cloud shadow or water
cloud_free_mask = masking.make_mask(dataset[qa_band], **qa_mask)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free[alias['nir']] - cloud_free[alias['red']]
band_sum = cloud_free[alias['nir']] + cloud_free[alias['red']]
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
actual_ndvi = ndvi.compute() ### COMPUTE MOVED HERE ###
CPU times: user 755 ms, sys: 257 ms, total: 1.01 s Wall time: 6.03 s
Now we are seeing a huge difference!
You may be thinking "Hold on a sec, the NDVI calculation is pretty quick in this example with such a small dataset, why such a big difference?" - and you'd be right. There is more going on.
Remember that dataset is a task graph with delayed tasks waiting to be executed when the result is required. In the example dataset, there are many data variables available but only 3 are used to produce the ndvi (qa_band, red and nir). As a result dask doesn't load the other variables and because computation time in this case is mostly IO related the execution time is a lot faster.
Of course we can save dask the trouble of figuring this out on our behalf and only load() the measurements we need in the first place. Let's check that now, we should see a similar performance figure.
if client is None:
cluster = LocalCluster()
client = Client(cluster)
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);
display(notebook_utils.localcluster_dashboard(client=client, server=easi.hub))
else:
client.restart()
2023-06-06 12:41:28,483 - distributed.nanny - WARNING - Restarting worker 2023-06-06 12:41:28,498 - distributed.nanny - WARNING - Restarting worker 2023-06-06 12:41:28,499 - distributed.nanny - WARNING - Restarting worker 2023-06-06 12:41:28,507 - distributed.nanny - WARNING - Restarting worker
%%time
chunks = {"time":1}
measurements = [alias[x] for x in ['qa_band', 'red', 'nir']]
dataset = None # clear results from any previous runs
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks,
group_by=group_by,
)
# Identify pixels that don't have cloud, cloud shadow or water
cloud_free_mask = masking.make_mask(dataset[qa_band], **qa_mask)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free[alias['nir']] - cloud_free[alias['red']]
band_sum = cloud_free[alias['nir']] + cloud_free[alias['red']]
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
actual_ndvi = ndvi.compute()
CPU times: user 387 ms, sys: 192 ms, total: 579 ms Wall time: 4.92 s
Pretty similar as expected, but again, a slight improvement because now there are less overheads and a smaller task graph.
Now it can pay to give dask a hand and not have the task graph cluttered with tasks you are not going to use. Still it's nice to see that dask can save you some time by only computing what is required when you need it.
For completeness we will take a look at the task graph for the full calculation, all the way to the NDVI result. Given the complexity of the full graph we'll simplify it to 2 time observations like we did when the task graph was introduced previously.
set_time = (set_time[0], parse(set_time[0]) + relativedelta(weeks=3))
# set_time = ("2021-01-01", "2021-01-14")
set_time
('2020-02-01', datetime.datetime(2020, 2, 22, 0, 0))
if client is None:
cluster = LocalCluster()
client = Client(cluster)
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);
display(notebook_utils.localcluster_dashboard(client=client, server=easi.hub))
else:
client.restart()
2023-06-06 12:42:20,790 - distributed.nanny - WARNING - Restarting worker 2023-06-06 12:42:20,796 - distributed.nanny - WARNING - Restarting worker 2023-06-06 12:42:20,847 - distributed.nanny - WARNING - Restarting worker 2023-06-06 12:42:20,853 - distributed.nanny - WARNING - Restarting worker
%%time
dataset = None # clear results from any previous runs
measurements = [alias[x] for x in ['qa_band', 'red', 'nir']]
dataset = dc.load(
product=product,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = chunks,
group_by=group_by,
)
# Identify pixels that don't have cloud, cloud shadow or water
cloud_free_mask = masking.make_mask(dataset[qa_band], **qa_mask)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free[alias['nir']] - cloud_free[alias['red']]
band_sum = cloud_free[alias['nir']] + cloud_free[alias['red']]
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
CPU times: user 48.5 ms, sys: 3.19 ms, total: 51.7 ms Wall time: 72 ms
ndvi.data.visualize()
The computation flows from bottom to top in the task graph. You can see there are two main paths, one for each time (since the time chunk is length 1). You can also see the three data sources are loaded independently. After that it gets a little more difficult to follow but you can see qa_band being used to produce the mask (and_, eq). Then combined via the where function with other two datasets. Then finally the NDVI calculation - a sub, add and divide (truediv).
Dask has lots of internal optimizations that it uses to help identify the dependencies and parallel components of a task graph. Sometimes it will reorder or prune operations where possible to further optimise (for example, not loading data variables that aren't used in the NDVI calculation).
Tip: The task graph can be complex but it is a useful tool in understanding your algorithm and how it scales.
client.close()
cluster.close()